iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 24
0
Software Development

python 自學系列 第 24

python day24(airflow)

  • 分享至 

  • xImage
  •  

今天介紹一個可以取代設定 cronjob 好用的工具 airflow.設定 cronjob 必須預估每個 job 的執行時間然後定排程,而且如果有多台機器的話沒辦法看出整個工作流程,只能到每台機器看 cronjob 的設定去看幾點會跑什麼之類的,但透過 airflow 可以把這些 job 定義成一個流程可以很方便的看出整個 work flow 很方便.

安裝 airflow

pip3 install apache-airflow

airflow 需要一個 database 來存放它的資料,這邊使用預設的 sqlite db.

> airflow initdb
[2019-10-09 11:04:41,590] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:////Users/daniel/airflow/airflow.db
[2019-10-09 11:04:41,994] {db.py:369} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/alembic/ddl/sqlite.py:39: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  "Skipping unsupported ALTER for "
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
......

airflow 已經有提供 sample 可以測試,測試程式內容如下.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

執行測試使用 airflow 指令,因為是測試用第一個參數使用 test,第二個參數 tutorial 是上面程式有定義的 DAG ID,第三個參數 print_date 是指上面程式定義的 task_id,第四個參數 2015-06-01 是指執行日期.結果遇到 ValueError: unknown locale: UTF-8 錯誤.

> airflow test tutorial print_date 2015-06-01
[2019-10-09 11:28:07,867] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-09 11:28:08,188] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
[2019-10-09 11:28:08,234] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 11:28:08,238] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 11:28:08,238] {taskinstance.py:838} INFO -
--------------------------------------------------------------------------------
[2019-10-09 11:28:08,238] {taskinstance.py:839} INFO - Starting attempt 1 of 2
[2019-10-09 11:28:08,238] {taskinstance.py:840} INFO -
--------------------------------------------------------------------------------
[2019-10-09 11:28:08,239] {taskinstance.py:859} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01T00:00:00+00:00
[2019-10-09 11:28:08,247] {taskinstance.py:1051} ERROR - unknown locale: UTF-8
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 895, in _run_raw_task
    context = self.get_template_context()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1133, in get_template_context
    ds = self.execution_date.strftime('%Y-%m-%d')
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pendulum/mixins/default.py", line 124, in strftime
    return self.format(fmt, _locale.getlocale()[0], 'classic')
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/locale.py", line 587, in getlocale
    return _parse_localename(localename)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/locale.py", line 495, in _parse_localename
    raise ValueError('unknown locale: %s' % localename)
ValueError: unknown locale: UTF-8

~/.bash_profile 加上下面兩行,然後再 source ~/.bash_profile

export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8

重新執行測試就可以了

> airflow test tutorial print_date 2015-06-01
[2019-10-09 12:14:08,800] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-09 12:14:09,117] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
[2019-10-09 12:14:09,163] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 12:14:09,167] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 12:14:09,167] {taskinstance.py:838} INFO -
--------------------------------------------------------------------------------
[2019-10-09 12:14:09,167] {taskinstance.py:839} INFO - Starting attempt 1 of 2
[2019-10-09 12:14:09,167] {taskinstance.py:840} INFO -
--------------------------------------------------------------------------------
[2019-10-09 12:14:09,168] {taskinstance.py:859} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01T00:00:00+00:00
[2019-10-09 12:14:09,180] {bash_operator.py:81} INFO - Tmp dir root location:
 /var/folders/8v/9r5mf0y506734gvj_j337w7w0000gn/T
[2019-10-09 12:14:09,181] {bash_operator.py:91} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=tutorial
AIRFLOW_CTX_TASK_ID=print_date
AIRFLOW_CTX_EXECUTION_DATE=2015-06-01T00:00:00+00:00
[2019-10-09 12:14:09,182] {bash_operator.py:105} INFO - Temporary script location: /var/folders/8v/9r5mf0y506734gvj_j337w7w0000gn/T/airflowtmpquddjput/print_datef1o0u8do
[2019-10-09 12:14:09,182] {bash_operator.py:115} INFO - Running command: date
[2019-10-09 12:14:09,194] {bash_operator.py:124} INFO - Output:
[2019-10-09 12:14:09,200] {bash_operator.py:128} INFO - Tue Oct  8 12:14:09 CST 2019
[2019-10-09 12:14:09,201] {bash_operator.py:132} INFO - Command exited with return code 0

work flow test

首先會有三個 python 程式,都各自 print 各自的內容.
work_flow1.py

print('this is local workflow1')

work_flow2.py

print('this is remote workflow2')

work_flow3.py

print('this is local workflow3')

要測試的 work flow 如下,決定 work flow 後開始定義 DAG definition file,建立一支 work_flow.py,這邊要去了解到四個物件 SSHHook、BashOperator、SFTPOperator、SSHOperator.

  1. 先在本機執行 work_flow1.py (使用 BashOperator)
  2. 再把 work_flow2.py 丟到另一台主機上 (使用 SSHHook、 SFTPOperator)
  3. 在另外一台主機執行 work_flow2.py (使用 SSHHook、 SSHOperator)
  4. 然後在本機執行 work_flow3.py (使用 BashOperator)
  5. 把 1、2、3、4 的流程整個串起來.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.sftp_operator import SFTPOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook

# 定義預設參數 `owner`一定要給,`start_date`代表要從哪一天開始執行,
# 如果設 datetime(2015, 6, 1) schedule_interval 設定每天跑一次,
# 就會從 2015年6月1號、2號、3號補跑,一直跑到目前系統的日期
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 8),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('myworkflowtest', default_args=default_args, schedule_interval=timedelta(days=1))

# 定義 SSHHook 存要連線主機的資訊
myhook = SSHHook(
    remote_host='192.168.1.11',
    username='test',
    password='test123',
    port='22',
)
# 1. 先在本機執行 work_flow1.py (使用 BashOperator)
my_flow1 = BashOperator(
    task_id='run_work_flow1',
    bash_command='python3 /Volumes/Transcend/pylearn/work_flow1.py',
    dag=dag)
# 2. 再把 work_flow2.py 丟到另一台主機上 (使用 SSHHook、 SFTPOperator)
my_flow2_1 = SFTPOperator(
    task_id="copy_work_flow2",
    ssh_hook=myhook,
    local_filepath="/Volumes/Transcend/pylearn/work_flow2.py",
    remote_filepath="/home/miuser/pylearn/work_flow2.py",
    operation="put",
    create_intermediate_dirs=True,
    dag=dag
)
# 3. 在另外一台主機執行 work_flow2.py (使用 SSHHook、 SSHOperator)
my_flow2_2 = SSHOperator(
    task_id="run_work_flow2",
    ssh_hook=myhook,
    command='python3 /home/miuser/pylearn/work_flow2.py',
    dag=dag
)
# 4. 然後在本機執行 work_flow3.py (使用 BashOperator)
my_flow3 = BashOperator(
    task_id='run_work_flow3',
    bash_command='python3 /Volumes/Transcend/pylearn/work_flow3.py',
    dag=dag)

# 5. 把 1、2、3、4 的流程整個串起來.
my_flow1 >> my_flow2_1 >> my_flow2_2 >> my_flow3

建好之後執行看看遇到No module named 'paramiko'

> python3 work_flow.py
Traceback (most recent call last):
  File "work_flow.py", line 3, in <module>
    from airflow.contrib.operators.sftp_operator import SFTPOperator
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/operators/sftp_operator.py", line 21, in <module>
    from airflow.contrib.hooks.ssh_hook import SSHHook
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/hooks/ssh_hook.py", line 24, in <module>
    import paramiko
ModuleNotFoundError: No module named 'paramiko'

安裝 paramiko 模組

pip3 install paramiko

再次執行又遇到No module named 'sshtunnel'

> python3 work_flow.py
Traceback (most recent call last):
  File "work_flow.py", line 3, in <module>
    from airflow.contrib.operators.sftp_operator import SFTPOperator
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/operators/sftp_operator.py", line 21, in <module>
    from airflow.contrib.hooks.ssh_hook import SSHHook
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/hooks/ssh_hook.py", line 26, in <module>
    from sshtunnel import SSHTunnelForwarder
ModuleNotFoundError: No module named 'sshtunnel'

安裝 sshtunnel 模組

pip3 install sshtunnel

之後執行終於正常了.

python3 work_flow.py

接著要把這檔案複製到 airflow 的 dags 目錄底下,如果目錄不存在就自己建立 /Users/daniel/airflow/dags

> cp work_flow.py /Users/daniel/airflow/dags

上面的路徑是看 /Users/daniel/airflow/airflow.cfg 設定檔裡有個 dags_folder 參數.

dags_folder = /Users/daniel/airflow/dags

執行 airflow list_dags 指令看目前有哪些 dags 可以執行.可以看到定義的 DAG myworkflowtest 有出現,代表建立成功了.

> airflow list_dags
[2019-10-09 14:13:51,691] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-09 14:13:52,028] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
......
latest_only
latest_only_with_trigger
myworkflowtest
test_utils
tutorial

都定義好之後,把 airflow scheduler 啟動起來,scheduler 會用來控管 DAG 的排程.

> airflow scheduler

啟動 airflow webserver,啟動後打開網頁http://localhost:8080 就可以看到 web UI.

> airflow webserver
[2019-10-09 11:16:50,303] {__init__.py:51} INFO - Using executor SequentialExecutor
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-10-09 11:16:50,962] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[2019-10-09 11:16:51 +0800] [8013] [INFO] Starting gunicorn 19.9.0
[2019-10-09 11:16:51 +0800] [8013] [INFO] Listening at: http://0.0.0.0:8080 (8013)
[2019-10-09 11:16:51 +0800] [8013] [INFO] Using worker: sync
......

打開(on) 並啟動定義的 DAG

https://ithelp.ithome.com.tw/upload/images/20191009/20107343UsiTMdSTMD.jpg

https://ithelp.ithome.com.tw/upload/images/20191009/20107343w9uv0b7koc.jpg

可以看到四個 task 最後都成功變成都是綠色的.

https://ithelp.ithome.com.tw/upload/images/20191009/20107343NIw5I3FbRw.jpg

透過了 airflow 把不同的程式,整合成一個 data flow 不用去定義 cron job,而且會造順序執行不同的程式,非常方便.airflow 還有很多功能跟細節可以好好研究一下.


上一篇
python day23(urllib)
下一篇
python day25(flask)
系列文
python 自學30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言